package com.xiaohu.transfrom.streamtranform;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/*
为了处理更加灵活，连接操作允许流的数据类型不同。
但我们知道一个DataStream中的数据只能有唯一的类型，所以连接得到的并不是DataStream,而是一个“连接流”。
连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中；事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。
要想得到新的DataStream,还需要进一步定义一个“同处理” （co-process）转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。
所以整体上来，两条流的连接就像是“一国两制”，两条流可以保持各自的数据类型、处理方式也可以不同不过最终还是会统一到同一个DataStream中。


 */
public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
//        DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33, 44, 55);
//        DataStreamSource<String> source3 = env.fromElements("111", "222", "333");

        DataStreamSource<String> source1 = env.socketTextStream("master", 7777);
        DataStreamSource<String> source2 = env.socketTextStream("master", 8888);

        //connect可以链接不同数据类型的流，但是返回值类型不再是DataStream类型了，就不能进行union合并流
        //一次只能连接两条流
//        ConnectedStreams<Integer, String> cs1 = source1.connect(source3);

        //可以采用先union合并，再进行connect不同数据类型的流
//        ConnectedStreams<Integer, String> cs2 = source1.union(source2).connect(source3);

        ConnectedStreams<String, String> cs = source1.connect(source2);

        //Integer: 第一个流的类型
        //String: 第二个流的类型
        //Object: 输出的类型
        //ConnectedStreams->经过map/flatmap/等转换算子可以变成DataStream
        SingleOutputStreamOperator<String> ds1 = cs.map(new CoMapFunction<String, String, String>() {
            @Override
            public String map1(String value) throws Exception {
                return "来源于source1端口7777：" + value;
            }

            @Override
            public String map2(String value) throws Exception {
                return "来源于source1端口8888：" + value;
            }
        });

        ds1.print();


        env.execute();
    }
}
